package com.hbase.hbasedatacollection;

import java.io.File;
import java.util.Date;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;

import com.google.common.collect.Queues;
import com.hbase.dao.HBaseBaseDao;
import com.hbase.dao.HBaseDaoImpl;
import com.hbase.dao.constant.HBaseConstant;
import com.hbase.dao.exception.HBaseDaoException;
import com.hbase.dao.pool.HTablePoolEngine;
import com.hbase.hbasedatacollection.entity.FilePropertyEntity;
import com.hbase.hbasedatacollection.util.FileUtil;

/**
 * HBaseDataCollection
 * 
 * @author yangshenhui
 * @version 1.0
 */
public class HBaseDataCollection {
	private static Logger logger = Logger.getLogger(HBaseDataCollection.class);
	private static Queue<File> fileQueue = Queues.newLinkedBlockingQueue();

	public static void main(String[] args) throws InterruptedException {
		String zookeeperIps = "redhat";
		logger.debug(zookeeperIps);
		try {
			HTablePoolEngine.createHBaseConfiguration(
					HBaseConstant.HBASE_DEFAULT_INSTANCE, zookeeperIps);
		} catch (HBaseDaoException e) {
			logger.debug(e);
		}
		String dataSources = "D:\\java";
		logger.debug(dataSources);
		int threadCount = Runtime.getRuntime().availableProcessors();
		String fileMatchRules = "";
		FileUtil.list(fileQueue, dataSources, fileMatchRules);
		HBaseDataCollection hBaseDataCollection = new HBaseDataCollection();
		hBaseDataCollection.run(threadCount);
	}

	public void run(int threadCount) throws InterruptedException {
		ExecutorService executorService = Executors
				.newFixedThreadPool(threadCount);
		for (int i = 1; i <= threadCount; ++i) {
			Thread thread = new Thread(new HBaseDataCollectionThread(
					"threadName-" + i));
			executorService.submit(thread);
		}
		executorService.shutdown();
		while (!executorService.isTerminated()) {
			executorService.awaitTermination(30, TimeUnit.SECONDS);
		}
	}

	private class HBaseDataCollectionThread implements Runnable {
		private String threadName;
		private HBaseBaseDao hBaseDao;

		public HBaseDataCollectionThread(String threadName) {
			this.threadName = threadName;
			this.hBaseDao = new HBaseDaoImpl();
		}

		@Override
		public void run() {
			logger.debug(threadName);
			while (true) {
				File file = null;
				file = fileQueue.poll();
				if (file == null) {
					break;
				}
				FilePropertyEntity filePropertyEntity = new FilePropertyEntity();
				filePropertyEntity.setId(file.getName());
				filePropertyEntity.setFileName(file.getName());
				filePropertyEntity.setFullPath(file.getAbsolutePath());
				filePropertyEntity.setFileSize(file.length());
				filePropertyEntity
						.setLastModified(new Date(file.lastModified()));
				try {
					if (!hBaseDao.insertObject(filePropertyEntity)) {
						logger.error(filePropertyEntity);
					}
				} catch (HBaseDaoException e) {
					logger.warn(e);
				}

			}
		}

	}

}